D:\a\scloud-dns\scloud-dns\src\workers\mod.rs
Line | Count | Source |
1 | | use crate::exceptions::SCloudException; |
2 | | use crate::workers::manager::StartGate; |
3 | | use crate::workers::task::InFlightTask; |
4 | | use crate::{log_error, log_info, log_sdebug, log_strace}; |
5 | | use anyhow::Result; |
6 | | use serde::{Deserialize, Serialize}; |
7 | | use std::sync::Arc; |
8 | | use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, AtomicUsize, Ordering}; |
9 | | use tokio::sync::{Mutex, MutexGuard, Semaphore, mpsc}; |
10 | | |
11 | | pub(crate) mod manager; |
12 | | pub(crate) mod queue; |
13 | | pub(crate) mod reply_registry; |
14 | | pub(crate) mod task; |
15 | | pub(crate) mod tests; |
16 | | pub(crate) mod types; |
17 | | |
18 | | #[allow(non_camel_case_types)] |
19 | | #[derive(Debug)] |
20 | | pub(crate) struct SCloudWorker { |
21 | | // IDENTITY |
22 | | pub(crate) worker_id: AtomicU64, |
23 | | pub(crate) worker_type: AtomicU8, |
24 | | |
25 | | // CHANNEL |
26 | | pub(crate) dns_tx: Mutex<Vec<mpsc::Sender<InFlightTask>>>, |
27 | | pub(crate) dns_rx: Mutex<Vec<mpsc::Receiver<InFlightTask>>>, |
28 | | |
29 | | // RESOURCES/LIMITS |
30 | | pub(crate) stack_size_bytes: AtomicUsize, |
31 | | pub(crate) buffer_budget_bytes: AtomicUsize, |
32 | | pub(crate) max_stack_size_bytes: AtomicUsize, |
33 | | pub(crate) max_buffer_budget_bytes: AtomicUsize, |
34 | | |
35 | | // RUNTIME STATE |
36 | | pub(crate) state: AtomicU8, |
37 | | pub(crate) shutdown_requested: AtomicBool, |
38 | | pub(crate) shutdown_mode: AtomicU8, |
39 | | |
40 | | // BACKPRESSURE/IN-FLIGHT |
41 | | pub(crate) in_flight: AtomicUsize, // for metrics |
42 | | pub(crate) in_flight_sem: Arc<Semaphore>, |
43 | | pub(crate) max_in_flight: AtomicUsize, |
44 | | |
45 | | // METRICS |
46 | | pub(crate) jobs_done: AtomicU64, |
47 | | pub(crate) jobs_failed: AtomicU64, |
48 | | pub(crate) jobs_retried: AtomicU64, |
49 | | |
50 | | pub(crate) last_job_started_ms: AtomicU64, |
51 | | pub(crate) last_job_finished_ms: AtomicU64, |
52 | | |
53 | | pub(crate) last_error_code: AtomicU64, |
54 | | pub(crate) last_error_at_ms: AtomicU64, |
55 | | |
56 | | // CORRELATION/TRACING |
57 | | pub(crate) last_task_id_hi: AtomicU64, // 128-bit UUID split |
58 | | pub(crate) last_task_id_lo: AtomicU64, |
59 | | } |
60 | | |
61 | | impl SCloudWorker { |
62 | | const NEVER_APPLIED: u8 = 0xFF; |
63 | | |
64 | 69 | pub(crate) fn new(worker_type: WorkerType) -> Result<Self, SCloudException> { |
65 | 69 | Ok(Self { |
66 | 69 | worker_id: AtomicU64::new(manager::generate_worker_id()), |
67 | 69 | worker_type: AtomicU8::new(worker_type as u8), |
68 | 69 | dns_tx: Mutex::new(Vec::new()), |
69 | 69 | dns_rx: Mutex::new(Vec::new()), |
70 | 69 | stack_size_bytes: AtomicUsize::new(2 * 1024 * 1024), |
71 | 69 | buffer_budget_bytes: AtomicUsize::new(4 * 1024 * 1024), |
72 | 69 | max_stack_size_bytes: AtomicUsize::new(32 * 1024 * 1024), |
73 | 69 | max_buffer_budget_bytes: AtomicUsize::new(256 * 1024 * 1024), |
74 | 69 | state: AtomicU8::new(WorkerState::INIT as u8), |
75 | 69 | shutdown_requested: AtomicBool::new(false), |
76 | 69 | shutdown_mode: AtomicU8::new(ShutdownMode::GRACEFUL as u8), |
77 | 69 | in_flight: AtomicUsize::new(0), |
78 | 69 | in_flight_sem: Arc::new(Semaphore::new(512)), |
79 | 69 | max_in_flight: AtomicUsize::new(512), |
80 | 69 | jobs_done: AtomicU64::new(0), |
81 | 69 | jobs_failed: AtomicU64::new(0), |
82 | 69 | jobs_retried: AtomicU64::new(0), |
83 | 69 | last_job_started_ms: AtomicU64::new(0), |
84 | 69 | last_job_finished_ms: AtomicU64::new(0), |
85 | 69 | last_error_code: AtomicU64::new(0), |
86 | 69 | last_error_at_ms: AtomicU64::new(0), |
87 | 69 | last_task_id_hi: AtomicU64::new(0), |
88 | 69 | last_task_id_lo: AtomicU64::new(0), |
89 | 69 | }) |
90 | 69 | } |
91 | | |
92 | 18 | pub async fn run(self: Arc<Self>, gate: Option<Arc<StartGate>>) -> Result<(), SCloudException> { |
93 | 18 | log_sdebug!( |
94 | | "Running SCloudWorker [ID: {}][TYPE: {:?}]", |
95 | 18 | self.get_worker_id(), |
96 | 18 | self.get_worker_type() |
97 | | ); |
98 | | |
99 | 18 | if let Some(g) = gate { |
100 | 18 | g.done().await; |
101 | 0 | } |
102 | 18 | match WorkerType::try_from(self.worker_type.load(Ordering::Relaxed)).unwrap() { |
103 | | WorkerType::LISTENER => { |
104 | 2 | return Err(SCloudException::SCLOUD_WORKER_LISTENER_NO_SOCKET); |
105 | | } |
106 | | WorkerType::DECODER => { |
107 | 2 | self.clone().set_state(WorkerState::IDLE); |
108 | 2 | let (rx0 , tx0 ) = self.get_dns_rx_tx().await?; |
109 | 0 | types::decoder::run_dns_decoder(self.clone(), rx, tx).await?; |
110 | | } |
111 | | WorkerType::QUERY_DISPATCHER => { |
112 | 2 | self.clone().set_state(WorkerState::IDLE); |
113 | 2 | let (rx0 , tx0 ) = self.get_dns_rx_tx().await?; |
114 | 0 | types::query_dispatcher::run_dns_query_dispatcher(self.clone(), rx, tx).await?; |
115 | | } |
116 | | WorkerType::CACHE_LOOKUP => { |
117 | 2 | self.clone().set_state(WorkerState::IDLE); |
118 | 2 | let (rx0 , tx0 ) = self.get_dns_rx_tx().await?; |
119 | 0 | types::cache_lookup::run_dns_cache_lookup(self.clone(), rx, tx).await?; |
120 | | } |
121 | | WorkerType::ZONE_MANAGER => { |
122 | 2 | self.clone().set_state(WorkerState::IDLE); |
123 | 2 | let (rx0 , tx0 ) = self.get_dns_rx_tx().await?; |
124 | 0 | types::zone_manager::run_dns_zone_manager(self.clone(), rx, tx).await?; |
125 | | } |
126 | | WorkerType::RESOLVER => { |
127 | 2 | self.clone().set_state(WorkerState::IDLE); |
128 | 2 | let (rx0 , tx0 ) = self.get_dns_rx_tx().await?; |
129 | 0 | types::resolver::run_dns_resolver(self.clone(), rx, tx).await?; |
130 | | } |
131 | | WorkerType::CACHE_WRITER => { |
132 | 2 | self.clone().set_state(WorkerState::IDLE); |
133 | 2 | let (rx0 , tx0 ) = self.get_dns_rx_tx().await?; |
134 | 0 | types::cache_writer::run_dns_cache_writer(self.clone(), rx, tx).await?; |
135 | | } |
136 | | WorkerType::ENCODER => { |
137 | 2 | self.clone().set_state(WorkerState::IDLE); |
138 | 2 | let (rx0 , tx0 ) = self.get_dns_rx_tx().await?; |
139 | 0 | types::encoder::run_dns_encoder(self.clone(), rx, tx).await?; |
140 | | } |
141 | | WorkerType::SENDER => { |
142 | 1 | self.clone().set_state(WorkerState::IDLE); |
143 | 1 | let rx0 = self.get_dns_rx().await?; |
144 | 0 | types::sender::run_dns_sender(self.clone(), rx).await?; |
145 | | } |
146 | | WorkerType::CACHE_JANITOR => { |
147 | 0 | self.clone().set_state(WorkerState::IDLE); |
148 | 0 | types::cache_janitor::run_dns_cache_janitor(self.clone()).await?; |
149 | | } |
150 | | WorkerType::METRICS => { |
151 | 0 | self.clone().set_state(WorkerState::IDLE); |
152 | 0 | types::metrics::start_otlp_logger().await; |
153 | | } |
154 | | WorkerType::TCP_ACCEPTOR => { |
155 | 1 | self.clone().set_state(WorkerState::IDLE); |
156 | 1 | let tx0 = self.get_dns_tx().await?; |
157 | 0 | types::tcp_acceptor::run_dns_tcp_acceptor(self.clone(), tx).await?; |
158 | | } |
159 | | WorkerType::DOH_ACCEPTOR => { |
160 | 0 | self.clone().set_state(WorkerState::IDLE); |
161 | 0 | let tx = self.get_dns_tx().await?; |
162 | 0 | types::doh_acceptor::run_dns_doh_acceptor(self.clone(), tx).await?; |
163 | | } |
164 | 0 | _ => {} |
165 | | } |
166 | 0 | Ok(()) |
167 | 18 | } |
168 | | |
169 | | #[inline] |
170 | 41 | pub fn get_worker_id(&self) -> u64 { |
171 | 41 | self.worker_id.load(Ordering::Relaxed) |
172 | 41 | } |
173 | | |
174 | | #[inline] |
175 | 50 | pub fn get_worker_type(&self) -> WorkerType { |
176 | 50 | WorkerType::try_from(self.worker_type.load(Ordering::Relaxed)).unwrap() |
177 | 50 | } |
178 | | |
179 | | #[inline] |
180 | 0 | pub async fn push_dns_rx(&self, rx: mpsc::Receiver<InFlightTask>) { |
181 | 0 | self.dns_rx.lock().await.push(rx); |
182 | 0 | } |
183 | | |
184 | | #[inline] |
185 | 0 | pub async fn push_dns_tx_many(&self, txs: Vec<mpsc::Sender<InFlightTask>>) { |
186 | 0 | self.dns_tx.lock().await.extend(txs); |
187 | 0 | } |
188 | | |
189 | | #[inline] |
190 | 15 | pub async fn get_dns_rx_tx( |
191 | 15 | &self, |
192 | 15 | ) -> Result< |
193 | 15 | ( |
194 | 15 | Vec<mpsc::Receiver<InFlightTask>>, |
195 | 15 | Vec<mpsc::Sender<InFlightTask>>, |
196 | 15 | ), |
197 | 15 | SCloudException, |
198 | 15 | > { |
199 | 15 | Ok((self.get_dns_rx().await?7 , self8 .get_dns_tx().await?7 )) |
200 | 15 | } |
201 | | |
202 | | #[inline] |
203 | 10 | pub async fn get_dns_tx(&self) -> Result<Vec<mpsc::Sender<InFlightTask>>, SCloudException> { |
204 | 10 | let mut guard = self.dns_tx.lock().await; |
205 | 10 | if guard.is_empty() { |
206 | 8 | return Err(SCloudException::SCLOUD_WORKER_TX_NOT_SET); |
207 | 2 | } |
208 | 2 | Ok(std::mem::take(&mut *guard)) |
209 | 10 | } |
210 | | |
211 | | #[inline] |
212 | 17 | pub async fn get_dns_rx(&self) -> Result<Vec<mpsc::Receiver<InFlightTask>>, SCloudException> { |
213 | 17 | let mut guard = self.dns_rx.lock().await; |
214 | 17 | if guard.is_empty() { |
215 | 8 | return Err(SCloudException::SCLOUD_WORKER_RX_NOT_SET); |
216 | 9 | } |
217 | 9 | Ok(std::mem::take(&mut *guard)) |
218 | 17 | } |
219 | | |
220 | | #[inline] |
221 | 2 | pub fn get_stack_size_bytes(&self) -> usize { |
222 | 2 | self.stack_size_bytes.load(Ordering::Relaxed) |
223 | 2 | } |
224 | | |
225 | | #[inline] |
226 | 2 | pub fn get_buffer_budget_bytes(&self) -> usize { |
227 | 2 | self.buffer_budget_bytes.load(Ordering::Relaxed) |
228 | 2 | } |
229 | | |
230 | | #[inline] |
231 | 2 | pub fn get_max_stack_size_bytes(&self) -> usize { |
232 | 2 | self.max_stack_size_bytes.load(Ordering::Relaxed) |
233 | 2 | } |
234 | | |
235 | | #[inline] |
236 | 2 | pub fn get_max_buffer_budget_bytes(&self) -> usize { |
237 | 2 | self.max_buffer_budget_bytes.load(Ordering::Relaxed) |
238 | 2 | } |
239 | | |
240 | | #[inline] |
241 | 7 | pub fn get_state(&self) -> u8 { |
242 | 7 | self.state.load(Ordering::Acquire) |
243 | 7 | } |
244 | | |
245 | | #[inline] |
246 | 2 | pub fn get_shutdown_requested(&self) -> bool { |
247 | 2 | self.shutdown_requested.load(Ordering::Acquire) |
248 | 2 | } |
249 | | |
250 | | #[inline] |
251 | 3 | pub fn get_shutdown_mode(&self) -> u8 { |
252 | 3 | self.shutdown_mode.load(Ordering::Acquire) |
253 | 3 | } |
254 | | |
255 | | #[inline] |
256 | 2 | pub fn get_in_flight(&self) -> usize { |
257 | 2 | self.in_flight.load(Ordering::Relaxed) |
258 | 2 | } |
259 | | |
260 | | #[inline] |
261 | 1 | pub fn get_in_flight_sem(&self) -> usize { |
262 | 1 | self.in_flight_sem.available_permits() |
263 | 1 | } |
264 | | |
265 | | #[inline] |
266 | 3 | pub fn get_max_in_flight(&self) -> usize { |
267 | 3 | self.max_in_flight.load(Ordering::Relaxed) |
268 | 3 | } |
269 | | |
270 | | #[inline] |
271 | 2 | pub fn get_jobs_done(&self) -> u64 { |
272 | 2 | self.jobs_done.load(Ordering::Relaxed) |
273 | 2 | } |
274 | | |
275 | | #[inline] |
276 | 2 | pub fn get_jobs_failed(&self) -> u64 { |
277 | 2 | self.jobs_failed.load(Ordering::Relaxed) |
278 | 2 | } |
279 | | |
280 | | #[inline] |
281 | 2 | pub fn get_jobs_retried(&self) -> u64 { |
282 | 2 | self.jobs_retried.load(Ordering::Relaxed) |
283 | 2 | } |
284 | | |
285 | | #[inline] |
286 | 2 | pub fn get_last_job_started_ms(&self) -> u64 { |
287 | 2 | self.last_job_started_ms.load(Ordering::Relaxed) |
288 | 2 | } |
289 | | |
290 | | #[inline] |
291 | 2 | pub fn get_last_job_finished_ms(&self) -> u64 { |
292 | 2 | self.last_job_finished_ms.load(Ordering::Relaxed) |
293 | 2 | } |
294 | | |
295 | | #[inline] |
296 | 2 | pub fn get_last_error_code(&self) -> u64 { |
297 | 2 | self.last_error_code.load(Ordering::Relaxed) |
298 | 2 | } |
299 | | |
300 | | #[inline] |
301 | 2 | pub fn get_last_error_at_ms(&self) -> u64 { |
302 | 2 | self.last_error_at_ms.load(Ordering::Relaxed) |
303 | 2 | } |
304 | | |
305 | | #[inline] |
306 | 2 | pub fn get_last_task_id_hi(&self) -> u64 { |
307 | 2 | self.last_task_id_hi.load(Ordering::Relaxed) |
308 | 2 | } |
309 | | |
310 | | #[inline] |
311 | 2 | pub fn get_last_task_id_lo(&self) -> u64 { |
312 | 2 | self.last_task_id_lo.load(Ordering::Relaxed) |
313 | 2 | } |
314 | | |
315 | | #[inline] |
316 | 1 | pub fn set_worker_id(&self, worker_id: u64) { |
317 | 1 | self.worker_id.store(worker_id, Ordering::Relaxed); |
318 | 1 | } |
319 | | |
320 | | #[inline] |
321 | 13 | pub fn set_worker_type(&self, worker_type: WorkerType) { |
322 | 13 | self.worker_type.store(worker_type as u8, Ordering::Relaxed); |
323 | 13 | } |
324 | | |
325 | | #[inline] |
326 | 1 | pub async fn set_dns_tx(&self, tx: mpsc::Sender<InFlightTask>) { |
327 | 1 | self.dns_tx.lock().await.push(tx); |
328 | 1 | } |
329 | | |
330 | | #[inline] |
331 | 1 | pub async fn set_dns_rx(&self, rx: mpsc::Receiver<InFlightTask>) { |
332 | 1 | self.dns_rx.lock().await.push(rx); |
333 | 1 | } |
334 | | |
335 | | #[inline] |
336 | 1 | pub fn set_stack_size_bytes(&self, stack_size_bytes: usize) { |
337 | 1 | self.stack_size_bytes |
338 | 1 | .store(stack_size_bytes, Ordering::Relaxed); |
339 | 1 | } |
340 | | |
341 | | #[inline] |
342 | 1 | pub fn set_buffer_budget_bytes(&self, buffer_budget_bytes: usize) { |
343 | 1 | self.buffer_budget_bytes |
344 | 1 | .store(buffer_budget_bytes, Ordering::Relaxed); |
345 | 1 | } |
346 | | |
347 | | #[inline] |
348 | 1 | pub fn set_max_stack_size_bytes(&self, max_stack_size_bytes: usize) { |
349 | 1 | self.max_stack_size_bytes |
350 | 1 | .store(max_stack_size_bytes, Ordering::Relaxed); |
351 | 1 | } |
352 | | |
353 | | #[inline] |
354 | 1 | pub fn set_max_buffer_budget_bytes(&self, max_buffer_budget_bytes: usize) { |
355 | 1 | self.max_buffer_budget_bytes |
356 | 1 | .store(max_buffer_budget_bytes, Ordering::Relaxed); |
357 | 1 | } |
358 | | |
359 | | #[inline] |
360 | 24 | pub fn set_state(&self, state: WorkerState) { |
361 | 24 | self.state.store(state as u8, Ordering::Relaxed); |
362 | 24 | } |
363 | | |
364 | | #[inline] |
365 | 1 | pub fn set_shutdown_requested(&self, shutdown_requested: bool) { |
366 | 1 | self.shutdown_requested |
367 | 1 | .store(shutdown_requested, Ordering::Relaxed); |
368 | 1 | } |
369 | | |
370 | | #[inline] |
371 | 2 | pub fn set_shutdown_mode(&self, shutdown_mode: ShutdownMode) { |
372 | 2 | self.shutdown_mode |
373 | 2 | .store(shutdown_mode as u8, Ordering::Relaxed); |
374 | 2 | } |
375 | | |
376 | | #[inline] |
377 | 1 | pub fn set_in_flight(&self, in_flight: usize) { |
378 | 1 | self.in_flight.store(in_flight, Ordering::Relaxed); |
379 | 1 | } |
380 | | |
381 | | #[inline] |
382 | 5 | pub fn set_max_in_flight(&self, max_in_flight: usize) { |
383 | 5 | self.max_in_flight.store(max_in_flight, Ordering::Relaxed); |
384 | 5 | } |
385 | | |
386 | | #[inline] |
387 | 1 | pub fn set_jobs_done(&self, jobs_done: u64) { |
388 | 1 | self.jobs_done.store(jobs_done, Ordering::Relaxed); |
389 | 1 | } |
390 | | |
391 | | #[inline] |
392 | 1 | pub fn set_jobs_failed(&self, jobs_failed: u64) { |
393 | 1 | self.jobs_failed.store(jobs_failed, Ordering::Relaxed); |
394 | 1 | } |
395 | | |
396 | | #[inline] |
397 | 1 | pub fn set_jobs_retried(&self, jobs_retried: u64) { |
398 | 1 | self.jobs_retried.store(jobs_retried, Ordering::Relaxed); |
399 | 1 | } |
400 | | |
401 | | #[inline] |
402 | 1 | pub fn set_last_job_started_ms(&self, last_job_started_ms: u64) { |
403 | 1 | self.last_job_started_ms |
404 | 1 | .store(last_job_started_ms, Ordering::Relaxed); |
405 | 1 | } |
406 | | |
407 | | #[inline] |
408 | 1 | pub fn set_last_job_finished_ms(&self, last_job_finished_ms: u64) { |
409 | 1 | self.last_job_finished_ms |
410 | 1 | .store(last_job_finished_ms, Ordering::Relaxed); |
411 | 1 | } |
412 | | |
413 | | #[inline] |
414 | 1 | pub fn set_last_error_code(&self, last_error_code: u64) { |
415 | 1 | self.last_error_code |
416 | 1 | .store(last_error_code, Ordering::Relaxed); |
417 | 1 | } |
418 | | |
419 | | #[inline] |
420 | 1 | pub fn set_last_error_at_ms(&self, last_error_at_ms: u64) { |
421 | 1 | self.last_error_at_ms |
422 | 1 | .store(last_error_at_ms, Ordering::Relaxed); |
423 | 1 | } |
424 | | |
425 | | #[inline] |
426 | 1 | pub fn set_last_task_id_hi(&self, last_task_id_hi: u64) { |
427 | 1 | self.last_task_id_hi |
428 | 1 | .store(last_task_id_hi, Ordering::Relaxed); |
429 | 1 | } |
430 | | |
431 | | #[inline] |
432 | 1 | pub fn set_last_task_id_lo(&self, last_task_id_lo: u64) { |
433 | 1 | self.last_task_id_lo |
434 | 1 | .store(last_task_id_lo, Ordering::Relaxed); |
435 | 1 | } |
436 | | } |
437 | | |
438 | | #[repr(u8)] |
439 | | #[allow(unused)] |
440 | | #[allow(non_camel_case_types)] |
441 | | #[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize, Eq)] |
442 | | pub enum WorkerType { |
443 | | NONE = 99, |
444 | | LISTENER = 0, |
445 | | DECODER = 1, |
446 | | QUERY_DISPATCHER = 2, |
447 | | CACHE_LOOKUP = 3, |
448 | | ZONE_MANAGER = 4, |
449 | | RESOLVER = 5, |
450 | | CACHE_WRITER = 6, |
451 | | ENCODER = 7, |
452 | | SENDER = 8, |
453 | | |
454 | | CACHE_JANITOR = 9, |
455 | | |
456 | | METRICS = 10, |
457 | | TCP_ACCEPTOR = 11, |
458 | | DOH_ACCEPTOR = 12, |
459 | | } |
460 | | |
461 | | impl TryFrom<u8> for WorkerType { |
462 | | type Error = (); |
463 | | |
464 | 69 | fn try_from(v: u8) -> Result<Self, Self::Error> { |
465 | 69 | Ok(match v { |
466 | 7 | 0 => WorkerType::LISTENER, |
467 | 7 | 1 => WorkerType::DECODER, |
468 | 7 | 2 => WorkerType::QUERY_DISPATCHER, |
469 | 7 | 3 => WorkerType::CACHE_LOOKUP, |
470 | 7 | 4 => WorkerType::ZONE_MANAGER, |
471 | 7 | 5 => WorkerType::RESOLVER, |
472 | 7 | 6 => WorkerType::CACHE_WRITER, |
473 | 7 | 7 => WorkerType::ENCODER, |
474 | 4 | 8 => WorkerType::SENDER, |
475 | 1 | 9 => WorkerType::CACHE_JANITOR, |
476 | 1 | 10 => WorkerType::METRICS, |
477 | 5 | 11 => WorkerType::TCP_ACCEPTOR, |
478 | 0 | 12 => WorkerType::DOH_ACCEPTOR, |
479 | 2 | 99 => WorkerType::NONE, |
480 | | // TODO: return an SCloudException |
481 | 0 | _ => return Err(()), |
482 | | }) |
483 | 69 | } |
484 | | } |
485 | | |
486 | | #[repr(u8)] |
487 | | #[allow(unused)] |
488 | | #[allow(non_camel_case_types)] |
489 | | #[derive(Debug, PartialEq)] |
490 | | pub(crate) enum WorkerState { |
491 | | INIT = 0, |
492 | | IDLE = 1, |
493 | | BUSY = 2, |
494 | | PAUSED = 3, |
495 | | STOPPING = 4, |
496 | | STOPPED = 5, |
497 | | } |
498 | | |
499 | | impl TryFrom<u8> for WorkerState { |
500 | | type Error = (); |
501 | | |
502 | 6 | fn try_from(v: u8) -> Result<Self, Self::Error> { |
503 | 6 | Ok(match v { |
504 | 1 | 0 => WorkerState::INIT, |
505 | 1 | 1 => WorkerState::IDLE, |
506 | 1 | 2 => WorkerState::BUSY, |
507 | 1 | 3 => WorkerState::PAUSED, |
508 | 1 | 4 => WorkerState::STOPPING, |
509 | 1 | 5 => WorkerState::STOPPED, |
510 | | // TODO: return an SCloudException |
511 | 0 | _ => return Err(()), |
512 | | }) |
513 | 6 | } |
514 | | } |
515 | | |
516 | | #[repr(u8)] |
517 | | #[allow(unused)] |
518 | | #[allow(non_camel_case_types)] |
519 | | #[derive(Debug, PartialEq)] |
520 | | pub(crate) enum ShutdownMode { |
521 | | GRACEFUL = 0, |
522 | | IMMEDIATE = 1, |
523 | | } |
524 | | |
525 | | impl TryFrom<u8> for ShutdownMode { |
526 | | type Error = (); |
527 | | |
528 | 2 | fn try_from(v: u8) -> Result<Self, Self::Error> { |
529 | 2 | Ok(match v { |
530 | 1 | 0 => ShutdownMode::GRACEFUL, |
531 | 1 | 1 => ShutdownMode::IMMEDIATE, |
532 | | // TODO: return an SCloudException |
533 | 0 | _ => return Err(()), |
534 | | }) |
535 | 2 | } |
536 | | } |
537 | | |
538 | 0 | pub fn spawn_worker( |
539 | 0 | worker: Arc<SCloudWorker>, |
540 | 0 | gate: Arc<StartGate>, |
541 | 0 | ) -> tokio::task::JoinHandle<()> { |
542 | 0 | tokio::spawn(async move { |
543 | 0 | gate.wait_turn(worker.get_worker_id()).await; |
544 | | |
545 | 0 | if let Err(e) = worker.clone().run(Some(gate.clone())).await { |
546 | 0 | log_error!("Worker {} failed: {:?}", worker.get_worker_id(), e); |
547 | 0 | } |
548 | | |
549 | 0 | gate.done().await; |
550 | 0 | }) |
551 | 0 | } |